Stacked Auto-Encoders

Este Notebook introduce el concepto de los stacked autoencoders. Para ello necesitamos crear una clase que codifique una capa del tipo Denoising Auto-Encoders.

Denoising Auto-Encoders

Una capa Auto-Encoder dispone de un encoder y un decoder que realizan el aprendizaje de los patrones de entrada. Básicamente se puede expresar esto matemáticamente como:

Encoder:

\begin{align} y &= s(W*x +b) \end{align}

Siendo s la función sigmoidea, por ejemplo.

Decoder:

\begin{align} z &= s(W'*x+b') \end{align}

W' sería el mapeo inverso, y z sería la reconstrucción de x (datos de entrada). Una forma de calcular W' sería:

\begin{align} W' &= W^T \end{align}

Con lo cual tenemos tres parámetros en esta capa:

\begin{align} {W, b, b'} \end{align}

Error de reconstrucción

Se puede calcular de muchas formas, pero de forma sencilla se puede definir como la entropía cruzada:

\begin{align} L_H(x,z) &= -\sum_{k=1}^d[X_k*log(Z_k) + (1-X_k)*log(1-Z_k)] \end{align}

Añadiendo la capacidad "denoising"

Si utilizamos el esquema anterior, el Auto-Encoder representará la identificación únicamente de los datos de entrada. Pero es mucho más ventajoso que el Auto-Encoder extraiga las representaciones o características más representativas de los datos.

Una sencilla técnica es introducir una serie de distorsiones, o más bien dicho, corrompemos la entrada de una determinada forma. Por ejemplo, eliminando alguno de los datos de entrada (puestos a cero). En theano esto se puede realizar con la función theano_rng.binomial, que devuelve un vector con 1s y 0s, usados para filtrar/corromper los datos de entrada.

Codificación en Theano


In [ ]:
class dA(object):
    """Denoising Auto-Encoder class (dA)
    """

    def __init__(
        self,
        numpy_rng,
        theano_rng=None,
        input=None,
        n_visible=784,
        n_hidden=500,
        W=None,
        bhid=None,
        bvis=None
    ):
        self.n_visible = n_visible
        self.n_hidden = n_hidden

        # create a Theano random generator that gives symbolic random values
        if not theano_rng:
            theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))

        # note : W' was written as `W_prime` and b' as `b_prime`
        if not W:
            initial_W = numpy.asarray(
                numpy_rng.uniform(
                    low=-4 * numpy.sqrt(6. / (n_hidden + n_visible)),
                    high=4 * numpy.sqrt(6. / (n_hidden + n_visible)),
                    size=(n_visible, n_hidden)
                ),
                dtype=theano.config.floatX
            )
            W = theano.shared(value=initial_W, name='W', borrow=True)

        if not bvis:
            bvis = theano.shared(
                value=numpy.zeros(
                    n_visible,
                    dtype=theano.config.floatX
                ),
                borrow=True
            )

        if not bhid:
            bhid = theano.shared(
                value=numpy.zeros(
                    n_hidden,
                    dtype=theano.config.floatX
                ),
                name='b',
                borrow=True
            )

        self.W = W
        # b corresponds to the bias of the hidden
        self.b = bhid
        # b_prime corresponds to the bias of the visible
        self.b_prime = bvis
        # tied weights, therefore W_prime is W transpose
        self.W_prime = self.W.T
        self.theano_rng = theano_rng
        # if no input is given, generate a variable representing the input
        if input is None:
            # we use a matrix because we expect a minibatch of several
            # examples, each example being a row
            self.x = T.dmatrix(name='input')
        else:
            self.x = input

        self.params = [self.W, self.b, self.b_prime]
        
    def get_corrupted_input(self, input, corruption_level):
        return self.theano_rng.binomial(size=input.shape, n=1,
                                        p=1 - corruption_level,
                                        dtype=theano.config.floatX) * input

    def get_hidden_values(self, input):
        """ Computes the values of the hidden layer """
        return T.nnet.sigmoid(T.dot(input, self.W) + self.b)

    def get_reconstructed_input(self, hidden):
        """Computes the reconstructed input given the values of the
        hidden layer

        """
        return T.nnet.sigmoid(T.dot(hidden, self.W_prime) + self.b_prime)

    def get_cost_updates(self, corruption_level, learning_rate):
        """ This function computes the cost and the updates for one trainng
        step of the dA """

        tilde_x = self.get_corrupted_input(self.x, corruption_level)
        y = self.get_hidden_values(tilde_x)
        z = self.get_reconstructed_input(y)
        L = - T.sum(self.x * T.log(z) + (1 - self.x) * T.log(1 - z), axis=1)
        cost = T.mean(L)

        # compute the gradients of the cost of the `dA` with respect
        # to its parameters
        gparams = T.grad(cost, self.params)
        # generate the list of updates
        updates = [
            (param, param - learning_rate * gparam)
            for param, gparam in zip(self.params, gparams)
        ]

        return (cost, updates)

Stacked Auto-Encoders

En la sección anterior hemos preparado la clase necesaria para incluir un Denoising Auto-Encoder (DaE) en nuestras redes.

El ejemplo que proponemos en este Notebook es un MLP de dos capas ocultas y una capa de salida (Logistic Regression).

Esta será la red sobre la que vamos a incluir las capas DaE, es decir, un simple MLP:

Y la red resultante que obtendremos con la inclusión de las correspondientes capas DaE será la siguiente:

Esta nueva red se convertiría en una red del tipo Stacked Auto-Encoders (SaE).

En este ejemplo en concreto vamos a clasificar imágenes de 20x20 pixels de números escritos a mano desde el 0 al 9. Los datos de entrenamiento están guardados en el fichero digits.mat de matlab.

Propiedades

  • Los parámetros W y b de las capas Sigmoidea y DaE serán los mismos. Es decir, si modificamos los parámetros W y b de la capa DaE se modificarán los correspondientes a la capa Sigmoidea.
  • La base de las SaE es la realizar un pre-entrenamiento únicamente de las capas DaE, seguido de un proceso de ajuste fino en la que solo se entrenan las capas sigmoideas y la capa de salida.

Pre-entrenamiento

  • Paso 1: Se entrena la capa {DaE_0} con los datos de entrada
  • Paso 2: Obtener la salida de la capa {Sigmoid_0}. (Nota: Mismos W y b que {DaE_0})
  • Paso 3: Para el resto de las capas (i = 1 to n, siendo n el número de capas restantes):
  • Paso 3.1: Entrenar la capa {DaE_i} con los datos de salida de la capa {Sigmoid_(i-1)}.
  • Paso 3.2: Obtener la salida de la capa {Sigmoid_i} con los datos de salida de la capa {Sigmoid(i-1)}.

En theano:


In [ ]:
import scipy.io as io

import numpy
import matplotlib.pyplot as plt

import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams

from mlp import CapaOculta, LogisticRegression
from dA import dA

In [ ]:
class SdA_MLP_Layer(object):
    def __init__(self, input, numpy_rng, theano_rng=None, n_ins=784, hidden_layers_size=500, corruption_level=.1):

        # Tensor de entrada
        self.input = input

        if not theano_rng:
            theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))

        # Capa sigmoidea
        self.sigmoid_layer = CapaOculta(rng=numpy_rng, input=input, n_in=n_ins, n_out=hidden_layers_size, activation=T.nnet.sigmoid)

            # Capa Denoising Autoencoder
        self.dA_layer = dA(numpy_rng=numpy_rng, theano_rng=theano_rng, input=input, n_visible=n_ins, n_hidden=hidden_layers_size, W=self.sigmoid_layer.W, bhid=self.sigmoid_layer.b)

        self.params = self.sigmoid_layer.params

        self.output = self.sigmoid_layer.output

    def pretraining(self, train_input, batch_size):
        # Creamos los tensores
        corruption_level = T.scalar('corruption')
        learning_rate = T.scalar('lr')
        index = T.iscalar('index')

        # Calculamos los indices del lote
        batch_begin = index * batch_size
        batch_end = batch_begin + batch_size

        # Calculamos los costes y las actualizaciones
        cost, updates = self.dA_layer.get_cost_updates(corruption_level, learning_rate)

        fn = theano.function(
            inputs=[
                index,
                theano.Param(corruption_level, default=0.2),
                theano.Param(learning_rate, default=0.1)],
            outputs=cost,
            updates=updates,
            givens={self.input: train_input[batch_begin:batch_end]})
        return fn

    def getOut(self, inputData, num_inputs):
        index = T.iscalar('index')
        batch_begin = index * num_inputs
        batch_end = (index+1) * num_inputs

        n_train_batches = inputData.get_value(borrow=True).shape[0] / batch_size

        salida = theano.function(
            inputs=[index],
            outputs=self.output,
            givens={self.input: inputData[batch_begin:batch_end]})
        return salida(0)

In [ ]:
class SdA_MLP_network(object):
    def __init__(self, input, output, hidden_layers_sizes=[500, 500], corruption_levels=[0.1, 0.1], n_outs=10, n_ins=400):

        numpy_rng = numpy.random.RandomState(89677)

        self.input = input
        self.output = output

        self.corruption_levels = corruption_levels

        self.layer0 = SdA_MLP_Layer(
            input=input,
            numpy_rng=numpy_rng,
            n_ins=n_ins,
            hidden_layers_size=hidden_layers_sizes[0],
            corruption_level=corruption_levels[0])

        self.layer1 = SdA_MLP_Layer(
            input=self.layer0.output,
            numpy_rng=numpy_rng,
            n_ins=hidden_layers_sizes[0],
            hidden_layers_size=hidden_layers_sizes[1],
            corruption_level=corruption_levels[1])

        self.log_layer = LogisticRegression(
            input=self.layer1.output,
            n_in=hidden_layers_sizes[-1],
            n_out=n_outs)

        self.params = self.log_layer.params + self.layer0.params + self.layer1.params

        self.finetune_cost = self.log_layer.negative_log_likelihood(output)
        self.errors = self.log_layer.errors(output)

    def pretraining(self, inputData, batch_size, pretraining_epochs, pretrain_lr):

        n_train_batches = inputData.get_value(borrow=True).shape[0] / batch_size

        fn0 = self.layer0.pretraining(inputData, batch_size)

        for epoch in xrange(pretraining_epochs):
            c = []
            for batch_index in xrange(n_train_batches):
                c.append(fn0(
                    index=batch_index,
                    corruption=self.corruption_levels[0],
                    lr=pretrain_lr))
            print 'Pre-training capa 0, epoch %d, cost %f' % (epoch, numpy.mean(c))

        output = self.layer0.getOut(inputData, inputData.get_value().shape[0])

        out = theano.shared(numpy.asarray(output, dtype=theano.config.floatX), borrow=True)

        fn1 = self.layer1.pretraining(out, batch_size)

        for epoch in xrange(pretraining_epochs):
            c = []
            for batch_index in xrange(n_train_batches):
                c.append(fn1(
                    index=batch_index,
                    corruption=self.corruption_levels[1],
                    lr=pretrain_lr))
            print 'Pre-training capa 1, epoch %d, cost %f' % (epoch, numpy.mean(c))

    def fine_tune(self, inputData, outputData, batch_size, learning_rate):
        # Indice neceario
        index = T.iscalar('index')

        gparams = T.grad(self.finetune_cost, self.params)
        updates = [(param, param - gparam * learning_rate) for param, gparam in zip(self.params, gparams)]

        train_fn = theano.function(
            inputs=[index],
            outputs=self.finetune_cost,
            updates=updates,
            givens={
                self.input: inputData[index * batch_size:(index + 1) * batch_size],
                self.output: outputData[index * batch_size:(index + 1) * batch_size]}, name='train')

        return train_fn
    
    def training(self, inputData, outputData, batch_size, n_epochs, learning_rate):
        print "...entrenando"

        n_train_batches = inputData.get_value(borrow=True).shape[0] / batch_size

        train_fn = self.fine_tune(inputData, outputData, batch_size, learning_rate)

        epoch = 0
        coste = numpy.zeros((n_epochs, 1))
        epoca = []
        coste = []
        while (epoch < n_epochs):
            epoch = epoch + 1
            minibatch_avg_cost = 0
            for minibatch_index in xrange(n_train_batches):
                minibatch_avg_cost = minibatch_avg_cost + train_fn(minibatch_index)
            print 'Training MLP, epoch %d, cost %f' % (epoch, minibatch_avg_cost/n_train_batches)
            coste.append(minibatch_avg_cost/n_train_batches)
            epoca.append(epoch)
        plt.plot(epoca, coste)
        plt.show()

    def test(self, inputData, outputData, batch_size):
        index = T.iscalar('index')
        n_train_batches = inputData.get_value(borrow=True).shape[0] / batch_size

        test_fn = theano.function(
            inputs=[index],
            outputs=self.errors,
            givens={
                self.input: inputData[index * batch_size:(index + 1) * batch_size],
                self.output: outputData[index * batch_size:(index + 1) * batch_size]}, name='test')

        def test_score():
            return [test_fn(i) for i in xrange(n_train_batches)]
        print 1. - numpy.mean(test_score())

In [ ]:
finetune_lr = 0.1
pretraining_epochs = 100
corruption_levels = [.1, .2]
pretrain_lr = 0.001
training_epochs = 200
dataset = 'digits.mat'
batch_size = 500

print "...cargando datos"
data = io.loadmat(dataset, squeeze_me=True)

dataIn = data['X'].astype(float)
dataOut = data['y'].astype(int)

for i in range(dataOut.shape[0]):
    if (dataOut[i] == 10):
        dataOut[i] = 0

train_set_x = theano.shared(numpy.asarray(dataIn, dtype=theano.config.floatX), borrow=True)
train_set_y = T.cast(theano.shared(numpy.asarray(dataOut, dtype=theano.config.floatX), borrow=True), 'int32')

n_train_batches = train_set_x.get_value(borrow=True).shape[0]
n_train_batches /= batch_size

x = T.matrix('x')  # Datos de entrada
y = T.ivector('y')	# Salida esperada

sda = SdA_MLP_network(
    x,
    y,
    n_outs=10,
    n_ins=20 * 20)


sda.pretraining(train_set_x, batch_size, pretraining_epochs, pretrain_lr)

sda.training(train_set_x, train_set_y, batch_size, training_epochs, finetune_lr)

sda.test(train_set_x, train_set_y, batch_size)

Práctica

1. Experimenta con los parámetros de la red


In [ ]:

2. Crea dos datasets para training y test


In [ ]: